Decision TreesΒΆ

Problem Statement
The input data is the iris dataset.
It contains recordings of information about flower samples.
For each sample, the petal and sepal length and width are
recorded along with the type of the flower.
We need to use this dataset to build a decision tree model that
can predict the type of flower based on the petal and sepal information.
Techniques used:
1. Decision Trees
2. Training and Testing
3. Confusion Matrix
# -*- coding: utf-8 -*-

import os

os.chdir("/home/cloudops/spark")
os.curdir

# Load the CSV file into a RDD
irisData = sc.textFile("data/iris.csv")
# ERROR: name 'sc' is not defined
irisData.persist()

# Remove the first line (contains headers)
dataLines = irisData.filter(lambda x: "Sepal" not in x)
dataLines.count()     # 150

# Convert the RDD into a numeric Dense Vector.
# As a part of this exercise:
# 1. Change labels to numeric ones

# For statistics
# from pyspark.mllib.linalg import Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

def transformToNumeric(inputStr):

    attList = inputStr.split(",")

    # Set default to setosa
    irisValue = 1.0
    if attList[4] == "versicolor":
        irisValue = 2.0
    if attList[4] == "virginica":
        irisValue = 3.0

    # Filter out columns not wanted at this stage
    values= Vectors.dense([ irisValue, \
                     float(attList[0]),  \
                     float(attList[1]),  \
                     float(attList[2]),  \
                     float(attList[3])  \
                     ])
    return values

# Change to a Vector
irisVectors = dataLines.map(transformToNumeric)
irisVectors.collect()
# [DenseVector([1.0, 5.1, 3.5, 1.4, 0.2]),
# DenseVector([1.0, 4.9, 3.0, 1.4, 0.2]),
# DenseVector([1.0, 4.7, 3.2, 1.3, 0.2]),
# . . .

# =====================================
# Perform Statistical Analysis
# =====================================
from pyspark.mllib.stat import Statistics

irisStats = Statistics.colStats(irisVectors)
# ERROR:
# Cannot convert type <class 'pyspark.ml.linalg.DenseVector'> into Vector
# Vectors must be from MLLib i.e.:
# from pyspark.mllib.linalg import Vectors

irisStats.mean()
# array([2.        , 5.84333333, 3.05733333, 3.758     , 1.19933333])
irisStats.variance()
# array([0.67114094, 0.68569351, 0.18997942, 3.11627785, 0.58100626])
irisStats.min()
# array([1. , 4.3, 2. , 1. , 0.1])
irisStats.max()
# array([3. , 7.9, 4.4, 6.9, 2.5])
Statistics.corr(irisVectors)
# array([[ 1.        ,  0.78256123, -0.42665756,  0.9490347 ,  0.95654733],
#       [ 0.78256123,  1.        , -0.11756978,  0.87175378,  0.81794113],
#       [-0.42665756, -0.11756978,  1.        , -0.4284401 , -0.36612593],
#       [ 0.9490347 ,  0.87175378, -0.4284401 ,  1.        ,  0.96286543],
#       [ 0.95654733,  0.81794113, -0.36612593,  0.96286543,  1.        ]])
# => Drop attribute #2 with low correlation

# =====================================
# Transform a RDD to Label Point
# Drop columns that are not required (low correlation)
# =====================================
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

def transformToLabeledPoint(inStr):
    attList = inStr.split(",")
    lp = (attList[4], \
          Vectors.dense([attList[0], attList[2], attList[3]]))
    return lp

# Transform RDD to Label Point
irisLp = dataLines.map(transformToLabeledPoint)

# Transform Label Point to Data Frame for input to Machine Learing
irisDF = sqlContext.createDataFrame(irisLp,["label", "features"])

# Show schema
irisDF.printSchema()
# root
# |-- label: string (nullable = true)
# |-- features: vector (nullable = true)

# Show DF
irisDF.select("label", "features").show(10)
# +------+-------------+
# | label|     features|
# +------+-------------+
# |setosa|[5.1,1.4,0.2]|
# |setosa|[4.9,1.4,0.2]|
# |setosa|[4.7,1.3,0.2]|
# |setosa|[4.6,1.5,0.2]|
# |setosa|[5.0,1.4,0.2]|
# |setosa|[5.4,1.7,0.4]|
# |setosa|[4.6,1.4,0.3]|
# |setosa|[5.0,1.5,0.2]|
# |setosa|[4.4,1.4,0.2]|
# |setosa|[4.9,1.5,0.1]|
# +------+-------------+

# =====================================
# Find Correlations
# =====================================
numFeatures = irisDF.take(1)[0].features.size
print(numFeatures)   # 3

# labelRDD = irisDF.map(lambda lp: lp.label)
# ERROR: 'DataFrame' object has no attribute 'map'
labelRDD = irisDF.rdd.map(lambda lp: lp.label)

for i in range(numFeatures):
    featureRDD = irisDF.rdd.map(lambda lp: lp.features[i])
    corr = Statistics.corr(labelRDD, featureRDD, 'pearson')
    print('%d\t%g' % (i, corr))
# ERROR:
# could not convert string to float: 'setosa'

# =====================================
# Indexing needed as pre-req for Decision Trees
# Convert string value to index - create new column in Raw
# =====================================
from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="label", outputCol="lbl_indexed")
si_model = stringIndexer.fit(irisDF)
dt = si_model.transform(irisDF)
dt.collect()
# [Row(label='setosa', features=DenseVector([5.1, 1.4, 0.2]), lbl_indexed=2.0),
# Row(label='setosa', features=DenseVector([4.9, 1.4, 0.2]), lbl_indexed=2.0),
# Row(label='setosa', features=DenseVector([4.7, 1.3, 0.2]), lbl_indexed=2.0),
# . . .
# Row(label='virginica', features=DenseVector([6.2, 5.4, 2.3]), lbl_indexed=1.0),
# Row(label='virginica', features=DenseVector([5.9, 5.1, 1.8]), lbl_indexed=1.0)]

# =====================================
# Split into Training and Testing Data
# =====================================
(trainingData, testData) = dt.randomSplit([0.9, 0.1])
trainingData.count()   # 135
testData.count()       # 15

testData.collect()
# [Row(label='setosa', features=DenseVector([4.8, 1.4, 0.1]), lbl_indexed=2.0),
# Row(label='setosa', features=DenseVector([4.8, 1.9, 0.2]), lbl_indexed=2.0),
# . . .
# Row(label='virginica', features=DenseVector([7.2, 6.0, 1.8]), lbl_indexed=1.0),
# Row(label='virginica', features=DenseVector([7.9, 6.4, 2.0]), lbl_indexed=1.0)]

# =====================================
# Create the Decision Tree model
# =====================================
from pyspark.ml.classification import DecisionTreeClassifier

dtClassifer = DecisionTreeClassifier(maxDepth=2, labelCol="lbl_indexed")

dtModel = dtClassifer.fit(trainingData)
# ERROR:
# IllegalArgumentException: 'requirement failed: Column features must be of type
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'
# Works for Vestors from ML, but not MLLib

dtModel.numNodes   # 5
dtModel.depth      # 2, maxDepth=2 above

# =====================================
# Predict on the Test Data
# =====================================
# create new prediction column and calc values for each row in trainingData
predictions = dtModel.transform(trainingData)

predictions.select("prediction", "lbl_indexed", "label", "features").collect()
# [Row(prediction=2.0, lbl_indexed=2.0, label='setosa', features=DenseVector([4.3, 1.1, 0.1])),
# Row(prediction=2.0, lbl_indexed=2.0, label='setosa', features=DenseVector([4.4, 1.3, 0.2])),
# Row(prediction=2.0, lbl_indexed=2.0, label='setosa', features=DenseVector([4.4, 1.3, 0.2])),
# . . .

# =====================================
# Evaluate the Result
# =====================================
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction", \
                    labelCol = "lbl_indexed", \
                    metricName = "accuracy")
# ERROR:
# Many ML examples broken since we deprecated `precision` in MulticlassClassificationEvaluator
# We should use {{accuracy}} to replace {{precision}} in these examples.

evaluator.evaluate(predictions)      # 0.9552238805970149

# =====================================
# Draw a Confusion Matrix
# =====================================
labelList = predictions.select("lbl_indexed","label").distinct().toPandas()
# ERROR: panda version should be > 0.19.3
# Workaround:
#   pip3 uninstall pandas   # was not installed
#   pip3 uninstall numpy
#   pip3 install pandas
#   pip3 install numpy

predictions.groupBy("lbl_indexed", "prediction").count().show()
# +-----------+----------+-----+
# |lbl_indexed|prediction|count|
# +-----------+----------+-----+
# |        1.0|       1.0|   45|
# |        0.0|       1.0|    2|  <---
# |        2.0|       2.0|   46|
# |        1.0|       0.0|    2|  <---
# |        0.0|       0.0|   41|
# +-----------+----------+-----+